{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# appsink方式\n",
    "\n",
    "\n",
    "[https://www.ardusub.com/developers/opencv.html](https://www.ardusub.com/developers/opencv.html)\n",
    "\n",
    "## python"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import cv2\n",
    "import gi\n",
    "import numpy as np\n",
    "\n",
    "gi.require_version('Gst', '1.0')\n",
    "from gi.repository import Gst\n",
    "\n",
    "\n",
    "class Video():\n",
    "    \"\"\"BlueRov video capture class constructor\n",
    "\n",
    "    Attributes:\n",
    "        port (int): Video UDP port\n",
    "        video_codec (string): Source h264 parser\n",
    "        video_decode (string): Transform YUV (12bits) to BGR (24bits)\n",
    "        video_pipe (object): GStreamer top-level pipeline\n",
    "        video_sink (object): Gstreamer sink element\n",
    "        video_sink_conf (string): Sink configuration\n",
    "        video_source (string): Udp source ip and port\n",
    "    \"\"\"\n",
    "    def __init__(self, port=5600):\n",
    "        \"\"\"Summary\n",
    "\n",
    "        Args:\n",
    "            port (int, optional): UDP port\n",
    "        \"\"\"\n",
    "\n",
    "        Gst.init(None)\n",
    "\n",
    "        self.port = port\n",
    "        self._frame = None\n",
    "\n",
    "        # [Software component diagram](https://www.ardusub.com/software/components.html)\n",
    "        # UDP video stream (:5600)\n",
    "        self.video_source = 'udpsrc port={}'.format(self.port)\n",
    "        # [Rasp raw image](http://picamera.readthedocs.io/en/release-0.7/recipes2.html#raw-image-capture-yuv-format)\n",
    "        # Cam -> CSI-2 -> H264 Raw (YUV 4-4-4 (12bits) I420)\n",
    "        self.video_codec = '! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264'\n",
    "        # Python don't have nibble, convert YUV nibbles (4-4-4) to OpenCV standard BGR bytes (8-8-8)\n",
    "        self.video_decode = \\\n",
    "            '! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert'\n",
    "        # Create a sink to get data\n",
    "        self.video_sink_conf = \\\n",
    "            '! appsink emit-signals=true sync=false max-buffers=2 drop=true'\n",
    "\n",
    "        self.video_pipe = None\n",
    "        self.video_sink = None\n",
    "\n",
    "        self.run()\n",
    "\n",
    "    def start_gst(self, config=None):\n",
    "        \"\"\" Start gstreamer pipeline and sink\n",
    "        Pipeline description list e.g:\n",
    "            [\n",
    "                'videotestsrc ! decodebin', \\\n",
    "                '! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',\n",
    "                '! appsink'\n",
    "            ]\n",
    "\n",
    "        Args:\n",
    "            config (list, optional): Gstreamer pileline description list\n",
    "        \"\"\"\n",
    "\n",
    "        if not config:\n",
    "            config = \\\n",
    "                [\n",
    "                    'videotestsrc ! decodebin',\n",
    "                    '! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',\n",
    "                    '! appsink'\n",
    "                ]\n",
    "\n",
    "        command = ' '.join(config)\n",
    "        self.video_pipe = Gst.parse_launch(command)\n",
    "        self.video_pipe.set_state(Gst.State.PLAYING)\n",
    "        self.video_sink = self.video_pipe.get_by_name('appsink0')\n",
    "\n",
    "    @staticmethod\n",
    "    def gst_to_opencv(sample):\n",
    "        \"\"\"Transform byte array into np array\n",
    "\n",
    "        Args:\n",
    "            sample (TYPE): Description\n",
    "\n",
    "        Returns:\n",
    "            TYPE: Description\n",
    "        \"\"\"\n",
    "        buf = sample.get_buffer()\n",
    "        caps = sample.get_caps()\n",
    "        array = np.ndarray((caps.get_structure(0).get_value('height'),\n",
    "                            caps.get_structure(0).get_value('width'), 3),\n",
    "                           buffer=buf.extract_dup(0, buf.get_size()),\n",
    "                           dtype=np.uint8)\n",
    "        return array\n",
    "\n",
    "    def frame(self):\n",
    "        \"\"\" Get Frame\n",
    "\n",
    "        Returns:\n",
    "            iterable: bool and image frame, cap.read() output\n",
    "        \"\"\"\n",
    "        return self._frame\n",
    "\n",
    "    def frame_available(self):\n",
    "        \"\"\"Check if frame is available\n",
    "\n",
    "        Returns:\n",
    "            bool: true if frame is available\n",
    "        \"\"\"\n",
    "        return type(self._frame) != type(None)\n",
    "\n",
    "    def run(self):\n",
    "        \"\"\" Get frame to update _frame\n",
    "        \"\"\"\n",
    "\n",
    "        self.start_gst([\n",
    "            self.video_source, self.video_codec, self.video_decode,\n",
    "            self.video_sink_conf\n",
    "        ])\n",
    "\n",
    "        self.video_sink.connect('new-sample', self.callback)\n",
    "\n",
    "    def callback(self, sink):\n",
    "        sample = sink.emit('pull-sample')\n",
    "        new_frame = self.gst_to_opencv(sample)\n",
    "        self._frame = new_frame\n",
    "\n",
    "        return Gst.FlowReturn.OK\n",
    "\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    # Create the video object\n",
    "    # Add port= if is necessary to use a different one\n",
    "    video = Video()\n",
    "\n",
    "    while True:\n",
    "        # Wait for the next frame\n",
    "        if not video.frame_available():\n",
    "            continue\n",
    "\n",
    "        frame = video.frame()\n",
    "        cv2.imshow('frame', frame)\n",
    "        if cv2.waitKey(1) & 0xFF == ord('q'):\n",
    "            break"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# C++"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```c++\n",
    "// Include atomic std library\n",
    "#include <atomic>\n",
    "\n",
    "// Include gstreamer library\n",
    "#include <gst/gst.h>\n",
    "#include <gst/app/app.h>\n",
    "\n",
    "// Include OpenCV library\n",
    "#include <opencv.hpp>\n",
    "\n",
    "// Share frame between main loop and gstreamer callback\n",
    "std::atomic<cv::Mat*> atomicFrame;\n",
    "\n",
    "/**\n",
    " * @brief Check preroll to get a new frame using callback\n",
    " *  https://gstreamer.freedesktop.org/documentation/design/preroll.html\n",
    " * @return GstFlowReturn\n",
    " */\n",
    "GstFlowReturn new_preroll(GstAppSink* /*appsink*/, gpointer /*data*/)\n",
    "{\n",
    "    return GST_FLOW_OK;\n",
    "}\n",
    "\n",
    "/**\n",
    " * @brief This is a callback that get a new frame when a preroll exist\n",
    " *\n",
    " * @param appsink\n",
    " * @return GstFlowReturn\n",
    " */\n",
    "GstFlowReturn new_sample(GstAppSink *appsink, gpointer /*data*/)\n",
    "{\n",
    "    static int framecount = 0;\n",
    "\n",
    "    // Get caps and frame\n",
    "    GstSample *sample = gst_app_sink_pull_sample(appsink);\n",
    "    GstCaps *caps = gst_sample_get_caps(sample);\n",
    "    GstBuffer *buffer = gst_sample_get_buffer(sample);\n",
    "    GstStructure *structure = gst_caps_get_structure(caps, 0);\n",
    "    const int width = g_value_get_int(gst_structure_get_value(structure, \"width\"));\n",
    "    const int height = g_value_get_int(gst_structure_get_value(structure, \"height\"));\n",
    "\n",
    "    // Print dot every 30 frames\n",
    "    if(!(framecount%30)) {\n",
    "        g_print(\".\");\n",
    "    }\n",
    "\n",
    "    // Show caps on first frame\n",
    "    if(!framecount) {\n",
    "        g_print(\"caps: %s\\n\", gst_caps_to_string(caps));\n",
    "    }\n",
    "    framecount++;\n",
    "\n",
    "    // Get frame data\n",
    "    GstMapInfo map;\n",
    "    gst_buffer_map(buffer, &map, GST_MAP_READ);\n",
    "\n",
    "    // Convert gstreamer data to OpenCV Mat\n",
    "    cv::Mat* prevFrame;\n",
    "    prevFrame = atomicFrame.exchange(new cv::Mat(cv::Size(width, height), CV_8UC3, (char*)map.data, cv::Mat::AUTO_STEP));\n",
    "    if(prevFrame) {\n",
    "        delete prevFrame;\n",
    "    }\n",
    "\n",
    "    gst_buffer_unmap(buffer, &map);\n",
    "    gst_sample_unref(sample);\n",
    "\n",
    "    return GST_FLOW_OK;\n",
    "}\n",
    "\n",
    "/**\n",
    " * @brief Bus callback\n",
    " *  Print important messages\n",
    " *\n",
    " * @param bus\n",
    " * @param message\n",
    " * @param data\n",
    " * @return gboolean\n",
    " */\n",
    "static gboolean my_bus_callback(GstBus *bus, GstMessage *message, gpointer data)\n",
    "{\n",
    "    // Debug message\n",
    "    //g_print(\"Got %s message\\n\", GST_MESSAGE_TYPE_NAME(message));\n",
    "    switch(GST_MESSAGE_TYPE(message)) {\n",
    "        case GST_MESSAGE_ERROR: {\n",
    "            GError *err;\n",
    "            gchar *debug;\n",
    "\n",
    "            gst_message_parse_error(message, &err, &debug);\n",
    "            g_print(\"Error: %s\\n\", err->message);\n",
    "            g_error_free(err);\n",
    "            g_free(debug);\n",
    "            break;\n",
    "        }\n",
    "        case GST_MESSAGE_EOS:\n",
    "            /* end-of-stream */\n",
    "            break;\n",
    "        default:\n",
    "            /* unhandled message */\n",
    "            break;\n",
    "    }\n",
    "    /* we want to be notified again the next time there is a message\n",
    "     * on the bus, so returning TRUE (FALSE means we want to stop watching\n",
    "     * for messages on the bus and our callback should not be called again)\n",
    "     */\n",
    "    return true;\n",
    "}\n",
    "\n",
    "int main(int argc, char *argv[]) {\n",
    "    gst_init(&argc, &argv);\n",
    "\n",
    "    gchar *descr = g_strdup(\n",
    "        \"udpsrc port=5600 \"\n",
    "        \"! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264 \"\n",
    "        \"! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert \"\n",
    "        \"! appsink name=sink emit-signals=true sync=false max-buffers=1 drop=true\"\n",
    "    );\n",
    "\n",
    "    // Check pipeline\n",
    "    GError *error = nullptr;\n",
    "    GstElement *pipeline = gst_parse_launch(descr, &error);\n",
    "\n",
    "    if(error) {\n",
    "        g_print(\"could not construct pipeline: %s\\n\", error->message);\n",
    "        g_error_free(error);\n",
    "        exit(-1);\n",
    "    }\n",
    "\n",
    "    // Get sink\n",
    "    GstElement *sink = gst_bin_get_by_name(GST_BIN(pipeline), \"sink\");\n",
    "\n",
    "    /**\n",
    "     * @brief Get sink signals and check for a preroll\n",
    "     *  If preroll exists, we do have a new frame\n",
    "     */\n",
    "    gst_app_sink_set_emit_signals((GstAppSink*)sink, true);\n",
    "    gst_app_sink_set_drop((GstAppSink*)sink, true);\n",
    "    gst_app_sink_set_max_buffers((GstAppSink*)sink, 1);\n",
    "    GstAppSinkCallbacks callbacks = { nullptr, new_preroll, new_sample };\n",
    "    gst_app_sink_set_callbacks(GST_APP_SINK(sink), &callbacks, nullptr, nullptr);\n",
    "\n",
    "    // Declare bus\n",
    "    GstBus *bus;\n",
    "    bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));\n",
    "    gst_bus_add_watch(bus, my_bus_callback, nullptr);\n",
    "    gst_object_unref(bus);\n",
    "\n",
    "    gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING);\n",
    "\n",
    "    // Main loop\n",
    "    while(1) {\n",
    "        g_main_iteration(false);\n",
    "\n",
    "        cv::Mat* frame = atomicFrame.load();\n",
    "        if(frame) {\n",
    "            cv::imshow(\"Frame\", atomicFrame.load()[0]);\n",
    "            cv::waitKey(30);\n",
    "        }\n",
    "    }\n",
    "\n",
    "    gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_NULL);\n",
    "    gst_object_unref(GST_OBJECT(pipeline));\n",
    "    return 0;\n",
    "}\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# VideoCapture+appsink 方式"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## demo1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import cv2\n",
    "from multiprocessing import Process\n",
    "\n",
    "\n",
    "def send():\n",
    "    cap_send = cv2.VideoCapture(\n",
    "        'videotestsrc ! video/x-raw,framerate=20/1 ! videoscale ! videoconvert ! appsink',\n",
    "        cv2.CAP_GSTREAMER)\n",
    "    out_send = cv2.VideoWriter(\n",
    "        'appsrc ! videoconvert ! x264enc tune=zerolatency bitrate=500 speed-preset=superfast ! rtph264pay ! udpsink host=127.0.0.1 port=5000',\n",
    "        cv2.CAP_GSTREAMER, 0, 20, (320, 240), True)\n",
    "\n",
    "    if not cap_send.isOpened() or not out_send.isOpened():\n",
    "        print('VideoCapture or VideoWriter not opened')\n",
    "        exit(0)\n",
    "\n",
    "    while True:\n",
    "        ret, frame = cap_send.read()\n",
    "\n",
    "        if not ret:\n",
    "            print('empty frame')\n",
    "            break\n",
    "\n",
    "        out_send.write(frame)\n",
    "\n",
    "        cv2.imshow('send', frame)\n",
    "        if cv2.waitKey(1) & 0xFF == ord('q'):\n",
    "            break\n",
    "\n",
    "    cap_send.release()\n",
    "    out_send.release()\n",
    "\n",
    "\n",
    "def receive():\n",
    "    cap_receive = cv2.VideoCapture(\n",
    "        'udpsrc port=5000 caps = \"application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H264, payload=(int)96\" ! rtph264depay ! decodebin ! videoconvert ! appsink',\n",
    "        cv2.CAP_GSTREAMER)\n",
    "\n",
    "    if not cap_receive.isOpened():\n",
    "        print('VideoCapture not opened')\n",
    "        exit(0)\n",
    "\n",
    "    while True:\n",
    "        ret, frame = cap_receive.read()\n",
    "\n",
    "        if not ret:\n",
    "            print('empty frame')\n",
    "            break\n",
    "\n",
    "        cv2.imshow('receive', frame)\n",
    "        if cv2.waitKey(1) & 0xFF == ord('q'):\n",
    "            break\n",
    "\n",
    "    #cap_receive.release()\n",
    "\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    s = Process(target=send)\n",
    "    r = Process(target=receive)\n",
    "    s.start()\n",
    "    r.start()\n",
    "    s.join()\n",
    "    r.join()\n",
    "\n",
    "    cv2.destroyAllWindows()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## demo2\n",
    "\n",
    "参考：https://blog.csdn.net/zong596568821xp/article/details/80306987"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import cv2 as cv\n",
    "\n",
    "gst_str = 'v4l2src device=/dev/video0 ! '\n",
    "               'video/x-raw, width=(int)1080, height=(int)960, '\n",
    "               'format=(string)RGB ! '\n",
    "               'videoconvert ! appsink'\n",
    "\n",
    "# gst_str = 'videotestsrc ! video/x-raw,framerate=20/1 ! videoconvert ! appsink'\n",
    "\n",
    "cap = cv.VideoCapture(gst_str, cv.CAP_GSTREAMER)\n",
    "\n",
    "if not cap.isOpened():\n",
    "    print(\"Cannot capture from camera. Exiting.\")\n",
    "    os._exit()\n",
    "last_time = time.time()\n",
    "\n",
    "while(True):\n",
    "\n",
    "    ret, frame = cap.read()\n",
    "    cv.imshow('frame', frame)\n",
    "\n",
    "    if cv.waitKey(1) & 0xFF == ord('q'):\n",
    "        break\n",
    "\n",
    "cap.release()\n",
    "cv.destroyAllWindows()\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {
    "height": "calc(100% - 180px)",
    "left": "10px",
    "top": "150px",
    "width": "187.818px"
   },
   "toc_section_display": true,
   "toc_window_display": true
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
